package mws;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public abstract class ServiceThread implements Runnable {
    protected static final Logger log = LoggerFactory.getLogger(ServiceThread.class);

    private static final long JOIN_TIME = 90 * 1000;

    protected Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;
    protected boolean isDaemon = false;

    //Make it able to restart the thread
    private final AtomicBoolean started = new AtomicBoolean(false);

    public ServiceThread() {

    }

    public abstract String getServiceName();

    public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
        log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
    }

    public void shutdown() {
        this.shutdown(false);
    }

    public void shutdown(final boolean interrupt) {
        log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(true, false)) {
            return;
        }
        this.stopped = true;
        log.info("shutdown thread[{}] interrupt={} ", getServiceName(), interrupt);

        //if thead is waiting, wakeup it
        wakeup();

        try {
            if (interrupt) {
                this.thread.interrupt();
            }

            long beginTime = System.currentTimeMillis();
            if (!this.thread.isDaemon()) {
                this.thread.join(this.getJoinTime());
            }
            long elapsedTime = System.currentTimeMillis() - beginTime;
            log.info("join thread[{}], elapsed time: {}ms, join time:{}ms", getServiceName(), elapsedTime, this.getJoinTime());
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        }
    }

    public long getJoinTime() {
        return JOIN_TIME;
    }

    public void makeStop() {
        if (!started.get()) {
            return;
        }
        this.stopped = true;
        log.info("makestop thread[{}] ", this.getServiceName());
    }

    public void wakeup() {
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            //CountDownLatch2  await(interval, TimeUnit.MILLISECONDS);
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

    protected void onWaitEnd() {
    }

    public boolean isStopped() {
        return stopped;
    }

    public boolean isDaemon() {
        return isDaemon;
    }

    public void setDaemon(boolean daemon) {
        isDaemon = daemon;
    }
    public class CountDownLatch2 {
        private final Sync sync;

        /**
         * Constructs a {@code CountDownLatch2} initialized with the given count.
         *
         * @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link
         * #await}
         * @throws IllegalArgumentException if {@code count} is negative
         */
        public CountDownLatch2(int count) {
            if (count < 0)
                throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }

        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>If the current count is zero then this method returns immediately.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of two things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * @throws InterruptedException if the current thread is interrupted while waiting
         */
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }

        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
         * or the specified waiting time elapses.
         *
         * <p>If the current count is zero then this method returns immediately
         * with the value {@code true}.
         *
         * <p>If the current count is greater than zero then the current
         * thread becomes disabled for thread scheduling purposes and lies
         * dormant until one of three things happen:
         * <ul>
         * <li>The count reaches zero due to invocations of the
         * {@link #countDown} method; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread; or
         * <li>The specified waiting time elapses.
         * </ul>
         *
         * <p>If the count reaches zero then the method returns with the
         * value {@code true}.
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * <p>If the specified waiting time elapses then the value {@code false}
         * is returned.  If the time is less than or equal to zero, the method
         * will not wait at all.
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the {@code timeout} argument
         * @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count
         * reached zero
         * @throws InterruptedException if the current thread is interrupted while waiting
         */
        public boolean await(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }

        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         *
         * <p>If the current count is greater than zero then it is decremented.
         * If the new count is zero then all waiting threads are re-enabled for
         * thread scheduling purposes.
         *
         * <p>If the current count equals zero then nothing happens.
         */
        public void countDown() {
            sync.releaseShared(1);
        }

        /**
         * Returns the current count.
         *
         * <p>This method is typically used for debugging and testing purposes.
         *
         * @return the current count
         */
        public long getCount() {
            return sync.getCount();
        }

        public void reset() {
            sync.reset();
        }

        /**
         * Returns a string identifying this latch, as well as its state.
         * The state, in brackets, includes the String {@code "Count ="}
         * followed by the current count.
         *
         * @return a string identifying this latch, as well as its state
         */
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }

        /**
         * Synchronization control For CountDownLatch2.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;

            private final int startCount;

            Sync(int count) {
                this.startCount = count;
                setState(count);
            }

            int getCount() {
                return getState();
            }

            @Override
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }

            @Override
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (; ; ) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c - 1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }

            protected void reset() {
                setState(startCount);
            }
        }
    }

}
